package cn.smileyan.demos.core;

import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.entity.TaskResult;
import cn.smileyan.demos.entity.TaskResultData;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

import java.util.LinkedList;
import java.util.List;
import java.util.OptionalDouble;
import java.util.stream.Collectors;

/**
 * 对 CPU集群 进行检测，检测规则包括
 *    0. 不满足以下条件，无风险
 *    1. 集群中 CPU 采样数据是否存在缺失，比如 CPU 1 采集到了，CPU 2 采集不到
 *    2. 集群中 CPU 平均使用率是否超过阈值（阈值在接收数据中配置）
 *    3. 集群中单个CPU使用率是否超过阈值（阈值在接收数据中配置）
 * @author smileyan
 */
@Slf4j
public class CpuCheckMapFunction implements MapFunction<TaskClusterData, TaskOutput> {
    @Override
    public TaskOutput map(TaskClusterData taskClusterData) {
        TaskOutput taskOutput = new TaskOutput();
        taskOutput.setTaskId(taskClusterData.getTaskId());
        taskOutput.setClusterId(taskClusterData.getClusterId());
        TaskResult taskResult = new TaskResult();
        taskOutput.setResults(taskResult);

        TaskResultData taskResultData = new TaskResultData();
        taskResultData.setTimestamp(taskClusterData.getCpuDataItems().get(0).getTimestamp());
        List<String> items = taskClusterData.getCpuDataItems().stream().map(CpuDataItem::getItemId).collect(Collectors.toList());

        /*
         * 1. 集群中 CPU 采样数据是否存在缺失，比如 CPU 1 采集到了，CPU 2 采集不到
         */
        if (taskClusterData.getClusterSize() != taskClusterData.getCpuDataItems().size()) {
            taskResult.setCode(ResultCodeEnum.MISSING.getCode());
            taskResult.setMessage(ResultCodeEnum.MISSING.getMessage());
            log.info("[{}] timeout while merging", taskClusterData.getTaskId());
            return taskOutput;
        }

        taskResult.setCode(ResultCodeEnum.SUCCESS.getCode());
        taskResult.setMessage(ResultCodeEnum.SUCCESS.getMessage());
        taskResultData.setRiskType(RiskTypeEnum.NONE.getValue());

        /*
         * 2. 集群中 CPU 平均使用率是否超过阈值（阈值在接收数据中配置）
         */
        OptionalDouble average = taskClusterData.getCpuDataItems().stream().mapToDouble(CpuDataItem::getValue).average();
        if (average.isPresent()) {
            if (average.getAsDouble() > taskClusterData.getThresholdConfig().getCpuUsageThresholdAverage()) {
                taskResultData.setRiskItems(items);
                taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_AVERAGE.getValue());
                log.info("[{}] average cpu usage has risks", taskClusterData.getTaskId());
                return taskOutput;
            }
        } else {
            taskResultData.setRiskItems(items);
            log.info("[{}] unknown errors", taskClusterData.getTaskId());
            taskResult.setCode(ResultCodeEnum.UNKNOWN_ERROR.getCode());
            taskResult.setMessage(ResultCodeEnum.UNKNOWN_ERROR.getMessage());
            return taskOutput;
        }

        // 3. 集群中单个CPU使用率是否超过阈值（阈值在接收数据中配置）
        List<String> riskItems = new LinkedList<>();
        for (CpuDataItem cpuDataItem : taskClusterData.getCpuDataItems()) {
            if (cpuDataItem.getValue() > taskClusterData.getThresholdConfig().getCpuUsageThresholdMax()) {
                riskItems.add(cpuDataItem.getItemId());
            }
        }
        if (!riskItems.isEmpty()) {
            taskResultData.setRiskItems(riskItems);
            taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_MAX.getValue());
            log.info("[{}] max cpu usage has risks", taskClusterData.getTaskId());
            return taskOutput;
        }
        taskResult.setData(taskResultData);
        log.info("[{}] results: {}", taskOutput.getTaskId(), taskResult);
        return taskOutput;
    }
}
